﻿using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Cyss.Core
{
    /// <summary>
    /// 自定义并行任务处理方法,主要针对异步处理方法，
    /// </summary>
    public static class ParallelExtensions
    {
        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TSource"></typeparam>
        /// <param name="source">数据源</param>
        /// <param name="Fun">处理方法</param>
        /// <param name="maxTask">最大任务数</param>
        /// <returns></returns>
        public static async Task ForEach<TSource>(IEnumerable<TSource> source, Func<TSource, Task> Fun, int maxTask = 3)
        {
            await Task.Run(async () =>
             {
                 MyParallel<TSource> myParallel = new MyParallel<TSource>(source, Fun, maxTask);
                 myParallel.Start();
                 while (myParallel.IsComplete() == false)
                 {
                     await Task.Delay(10);
                 }
             });
        }
    }

    /// <summary>
    /// 
    /// </summary>

    internal class MyParallel<TSource>
    {
        /// <summary>
        /// 
        /// </summary>
        private int Index = 0;
        /// <summary>
        /// 最大并行任务数量
        /// </summary>
        private int MaxTask { set; get; }

        private ConcurrentQueue<TSource> Sources { set; get; }

        public Func<TSource, Task> Action { get; set; }

        /// <summary>
        /// 锁
        /// </summary>
        private static object lockObject = new object();

        public MyParallel(IEnumerable<TSource> sources, Func<TSource, Task> action, int maxTask = 3)
        {
            Sources = new ConcurrentQueue<TSource>();
            this.MaxTask = maxTask;
            foreach (var item in sources)
            {
                Sources.Enqueue(item);
            }
            this.Action = action;
        }

        /// <summary>
        /// 开始任务
        /// </summary>
        public void Start()
        {
            for (int i = 1; i <= MaxTask; i++)
            {
                Task.Run(Run);
            }
        }

        /// <summary>
        /// 任务是否执行完成
        /// </summary>
        /// <returns></returns>
        public bool IsComplete()
        {
            if (Sources.Count() == 0 && Index == 0)
            {
                return true;
            }
            return false;
        }


        /// <summary>
        /// 添加计数器
        /// </summary>
        private void AddCounter()
        {
            lock (lockObject)
            {
                Index++;
            }
        }

        /// <summary>
        /// 减少计数器
        /// </summary>
        private void ReduceCounter()
        {
            lock (lockObject)
            {
                Index--;
            }
        }
        private async void Run()
        {
            TSource source;
            var IsReadSuccess = Sources.TryDequeue(out source);
            while (IsReadSuccess)
            {
                AddCounter();
                try
                {
                    await Action.Invoke(source);
                }
                catch (Exception ex)
                {

                }
                ReduceCounter();
                IsReadSuccess = Sources.TryDequeue(out source);
            }
        }

    }
}
